Fork me on GitHub

docker配置Kafka环境

Kafka环境需要先安装java和zookeeper,步骤比较繁琐,使用docker就可以简化很多。

安装docker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
sudo apt-get remove docker docker-engine docker.io containerd runc
sudo apt-get update
sudo apt-get install \
apt-transport-https \
ca-certificates \
curl \
gnupg-agent \
software-properties-common
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
sudo add-apt-repository \
"deb [arch=amd64] https://download.docker.com/linux/ubuntu \
$(lsb_release -cs) \
stable"
sudo apt-get update
sudo apt-get install docker-ce docker-ce-cli containerd.io

测试是否安装成功:

sudo docker run hello-world

在Docker环境下部署Kafka

pull镜像

docker pull wurstmeister/zookeeper

docker pull wurstmeister/kafka

开启服务

docker run -d –name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

docker run -d –name kafka –publish 9092:9092 –link zookeeper –env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 –env KAFKA_ADVERTISED_HOST_NAME=192.168.105.223 –env KAFKA_ADVERTISED_PORT=9092 –volume /etc/localtime:/etc/localtime wurstmeister/kafka:latest

以上步骤就可以启动Kafka了,然后执行Docker ps,找到kafka的Container ID:

sudo docker exec -it xxx(Container ID) /bin/bash 进入容器内部

cd /opt/kafka_2.12-2.4.1/ 进入kafka默认目录

kafka-topics.sh –create –zookeeper zookeeper:2181 –replication-factor 1 –partitions 1 –topic mykafka 创建一个主题

生产者消费者python测试

安装python包:

apt install python-kafka python3-kafka

生产者代码:

因为我使用了python3,如果不加上.encode(‘utf-8’)就会报错,但如果使用python2就不会报错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import time
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers = ['192.168.105.223:9092'])
topic = 'mykafka'
def test():
print('begin')
n = 1
while (n<=100):
msg="{\"top\":" + str(n) + "}"
producer.send(topic, msg.encode('utf-8'))
print("send" + msg)
n += 1
time.sleep(0.5)
print('done')
test()

消费者代码:

1
2
3
4
5
6
from kafka import KafkaConsumer
consumer = KafkaConsumer('mykafka', bootstrap_servers = ['192.168.105.223:9092'])
for msg in consumer:
print(msg)

然后开两个终端,一个运行消费者代码,一个运行生产者代码。

消费者:

ConsumerRecord(topic=’mykafka’, partition=0, offset=154, timestamp=1587009112770, timestamp_type=0, key=None, value=b’{“top”:97}’, checksum=-792520670, serialized_key_size=-1, serialized_value_size=10)
ConsumerRecord(topic=’mykafka’, partition=0, offset=155, timestamp=1587009113271, timestamp_type=0, key=None, value=b’{“top”:98}’, checksum=-1791183436, serialized_key_size=-1, serialized_value_size=10)
ConsumerRecord(topic=’mykafka’, partition=0, offset=156, timestamp=1587009113772, timestamp_type=0, key=None, value=b’{“top”:99}’, checksum=-10726817, serialized_key_size=-1, serialized_value_size=10)
ConsumerRecord(topic=’mykafka’, partition=0, offset=157, timestamp=1587009114273, timestamp_type=0, key=None, value=b’{“top”:100}’, checksum=1039172749, serialized_key_size=-1, serialized_value_size=11)

生产者:

send{“top” :98}
done
send{“top”:99}
done
send{“top”:100}
done

-------------本文结束感谢您的阅读-------------